/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.service.engine.impl;

import com.chinamobile.cmss.lakehouse.common.Constants;
import com.chinamobile.cmss.lakehouse.common.dto.ExecuteSQLBean;
import com.chinamobile.cmss.lakehouse.common.dto.engine.KubernetesCommonTaskReq;
import com.chinamobile.cmss.lakehouse.common.dto.engine.LakehouseResponse;
import com.chinamobile.cmss.lakehouse.common.enums.EngineType;
import com.chinamobile.cmss.lakehouse.common.enums.HttpStatus;
import com.chinamobile.cmss.lakehouse.common.enums.ResourceManagerEnum;
import com.chinamobile.cmss.lakehouse.common.enums.TaskStatusTypeEnum;
import com.chinamobile.cmss.lakehouse.common.enums.TaskType;
import com.chinamobile.cmss.lakehouse.common.kubernetes.ComputeResource;
import com.chinamobile.cmss.lakehouse.common.kubernetes.ResourceSizeConfig;
import com.chinamobile.cmss.lakehouse.common.utils.ClusterUtil;
import com.chinamobile.cmss.lakehouse.common.utils.KeyValue;
import com.chinamobile.cmss.lakehouse.common.utils.PropertyUtils;
import com.chinamobile.cmss.lakehouse.common.utils.TaskUtil;
import com.chinamobile.cmss.lakehouse.common.utils.spark.SparkSqlTblInfo;
import com.chinamobile.cmss.lakehouse.core.client.IKubernetesClient;
import com.chinamobile.cmss.lakehouse.core.config.KubeConfig;
import com.chinamobile.cmss.lakehouse.core.config.KubernetesConfiguration;
import com.chinamobile.cmss.lakehouse.core.config.S3Config;
import com.chinamobile.cmss.lakehouse.core.handler.K8sUriHandler;
import com.chinamobile.cmss.lakehouse.dao.HiveMetastoreConfigDao;
import com.chinamobile.cmss.lakehouse.dao.LakehouseClusterInfoDao;
import com.chinamobile.cmss.lakehouse.dao.SparkSQLTaskInfoDao;
import com.chinamobile.cmss.lakehouse.dao.entity.EngineTaskInfoEntity;
import com.chinamobile.cmss.lakehouse.dao.entity.HiveMetastoreConfigEntity;
import com.chinamobile.cmss.lakehouse.dao.entity.LakehouseClusterInfoEntity;
import com.chinamobile.cmss.lakehouse.service.engine.EngineService;
import com.chinamobile.cmss.lakehouse.service.engine.ILogService;
import com.chinamobile.cmss.lakehouse.service.engine.ResourceManagerHandler;
import com.chinamobile.cmss.lakehouse.service.engine.SparkTaskService;
import com.chinamobile.cmss.lakehouse.service.redis.RedisConfig;
import com.chinamobile.cmss.lakehouse.service.redis.RedisOperateClient;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class SparkTaskServiceImpl implements SparkTaskService {

    private final String defaultQueueRootName = "root";
    private final int defaultExecutorCpu = 4;
    private final int submitRetryInterval = 50000;
    private final int submitMaxRetryTimes = 3;
    private final int lossCheckInterval = 300000;

    private final String sqlMainClass = PropertyUtils.getString(Constants.ENGINE_SQL_APP_MAIN_CLASS);
    private final String sqlAppSource = PropertyUtils.getString(Constants.ENGINE_SQL_APP_RESOURCE);
    private final String resourceManager = PropertyUtils.getString(Constants.RESOURCE_MANAGER);
    private final Boolean isAwsPathStyleAccess = PropertyUtils.getBoolean(Constants.ENGINE_AWS_PATH_STYLE_ACCESS);

    @Autowired
    private SparkSQLTaskInfoDao sparkSQLTaskInfoDao;

    @Autowired
    private LakehouseClusterInfoDao clusterInfoDao;

    @Autowired
    private HiveMetastoreConfigDao metastoreConfigDao;

    @Autowired
    private RedisOperateClient redisClient;

    @Autowired
    private ResourceManagerHandler resourceManagerHandler;

    @Autowired
    IKubernetesClient kubernetesClient;

    @Autowired
    private EngineService engineService;

    @Autowired
    private ILogService iLogService;

    @Autowired
    private RedisConfig redisConfig;

    @Autowired
    private KubeConfig kubeConfig;

    @Autowired
    private K8sUriHandler k8sUriHandler;

    @Autowired
    private S3Config s3Config;

    @Override
    public String acceptTask(ExecuteSQLBean req) {
        String instance = req.getInstance();
        String sqlContent = req.getSqlContext();
        String driverPod = TaskUtil.generateDriverPod();
        String taskId = TaskUtil.generateTaskId();

        EngineTaskInfoEntity taskInfo = new EngineTaskInfoEntity();
        taskInfo.setSqlContent(sqlContent);
        taskInfo.setInstance(instance);
        taskInfo.setDbName(req.getDbName());
        taskInfo.setDriverPodId(driverPod);
        taskInfo.setTaskId(taskId);
        taskInfo.setQueryMetastoreURI(k8sUriHandler.getInternalMetaUrl());
        taskInfo.setQueryMetastoreCatalog(instance);
        taskInfo.setSubmitUser(req.getSubmitUser());
        taskInfo.setSubmitUserId(req.getUserId());

        // always be the same values
        taskInfo.setEngineType(EngineType.SPARK.getType());
        taskInfo.setTaskType("DML");

        taskInfo.setQueryRedisHost(redisConfig.getHost());
        taskInfo.setQueryRedisPort(String.valueOf(redisConfig.getPort()));

        // for sql
        taskInfo.setEngineArgs(Arrays.asList(sqlContent));
        taskInfo.setMainClass(sqlMainClass);
        taskInfo.setFilePath(sqlAppSource);

        taskInfo.setQueryRedisKey(driverPod);
        taskInfo.setStatusTypeEnum(TaskStatusTypeEnum.ACCEPTED);

        sparkSQLTaskInfoDao.save(taskInfo);

        return taskId;
    }

    @Override
    public LakehouseResponse allocateJdbcResource(String clusterName) {
        LakehouseClusterInfoEntity cluster = clusterInfoDao.findByInstance(clusterName);
        Map<String, String> engineConfig = new HashMap<>();
        if (null == cluster) {
            log.error("{} : Cluster not found", clusterName);
            return new LakehouseResponse(HttpStatus.NOT_FOUND, "Cluster not found");
        } else {
            try {
                redisClient.updateRunningTask(cluster.getInstance(), 1);
                KeyValue<Integer, KeyValue<Integer, ComputeResource>> result =
                    resourceManagerHandler.allocate(cluster, TaskType.JDBC);
                if (result.key() <= 0) {
                    redisClient.updateRunningTask(cluster.getInstance(), -1);
                    return LakehouseResponse.fail(HttpStatus.INTERNAL_SERVER_ERROR.getCode(), "No enough resource to start jdbc.", engineConfig);
                }
                if (result.value().value().getCpu() < 1) {
                    return LakehouseResponse.fail(HttpStatus.INTERNAL_SERVER_ERROR.getCode(),
                        String.format("Spark driver cups: %s must be positive", result.value().value().getCpu()), engineConfig);
                }
                generateEngineConfig(engineConfig, cluster.getUserId(), result.key(), result.value().key(),
                    result.value().value(), clusterName);
                log.info("Success get jdbc resource, driver: {}, {}",
                    engineConfig.get("spark.driver.cores"), engineConfig.get("spark.driver.memory"));
                return LakehouseResponse.success(engineConfig);
            } catch (Exception exception) {
                redisClient.updateRunningTask(cluster.getInstance(), -1);
                log.error("Failed to allocate resource for jdbc.", exception);
                return LakehouseResponse.fail(HttpStatus.INTERNAL_SERVER_ERROR.getCode(), exception.getMessage(), engineConfig);
            }
        }
    }

    @Override
    public LakehouseResponse releaseJdbcResource(String instance) {
        LakehouseClusterInfoEntity cluster = clusterInfoDao.findByInstance(instance);
        if (null == cluster) {
            log.error("{} : Cluster not found", instance);
            return LakehouseResponse.fail(HttpStatus.NOT_FOUND.getCode(), "Cluster not found", redisClient.getRunningTask(instance));
        } else {
            // reduce running task
            long current = redisClient.updateRunningTask(instance, -1);
            return LakehouseResponse.success(current);
        }
    }

    @Override
    public void submitTask() {
        List<EngineTaskInfoEntity> tasks = sparkSQLTaskInfoDao.findAcceptedTasks();
        if (tasks.isEmpty()) {
            log.trace("No task need to be submitted!");
            return;
        }
        List<EngineTaskInfoEntity> taskUpdate = new ArrayList<>();
        tasks.forEach(task -> {
            try {
                LakehouseClusterInfoEntity cluster = clusterInfoDao.findByInstance(task.getInstance());
                if (null == cluster || !cluster.getStatus().equalsIgnoreCase("running")) {
                    log.error("{} : Cluster not found or in error status", task.getInstance());
                    task.setSubmitTime(new Date());
                    task.setStatusTypeEnum(TaskStatusTypeEnum.FAILED);

                    taskUpdate.add(task);
                } else {
                    KeyValue<Integer, KeyValue<Integer, ComputeResource>> result =
                        resourceManagerHandler.allocate(cluster, TaskType.CONSOLE);
                    if (result.key() > 0) {
                        // if has enough resource, try to submit.
                        trySubmitTask(task, cluster.getUserId(), result.key(), result.value().key(), result.value().value());
                        redisClient.updateRunningTask(cluster.getInstance(), 1);

                        taskUpdate.add(task);
                    } else {
                        log.warn("There are no enough resource for task:{}", task.getId());
                    }
                }
            } catch (Exception e) {
                // any exception will not block other task
                log.error("For cluster:{} submit sql:{} failed with error:{}", task.getInstance(),
                    task.getSqlContent(), e);
                // Make submit failed
                task.setSubmitTime(new Date());
                task.setStatusTypeEnum(TaskStatusTypeEnum.SUBMIT_FAILED);
                task.setRetryTimes(task.getRetryTimes() + 1);

                taskUpdate.add(task);
            }
        });
        if (!taskUpdate.isEmpty()) {
            updateAll(taskUpdate);
            List<Long> success = tasks.stream().filter(s -> TaskStatusTypeEnum.SUBMITTED.equals(s.getStatusTypeEnum()))
                .map(EngineTaskInfoEntity::getId).collect(Collectors.toList());
            if (!success.isEmpty()) {
                log.info("Success submit tasks: {}", success);
            }
            List<Long> failed = tasks.stream()
                .filter(s -> TaskStatusTypeEnum.SUBMIT_FAILED.equals(s.getStatusTypeEnum())
                    || TaskStatusTypeEnum.FAILED.equals(s.getStatusTypeEnum()))
                .map(EngineTaskInfoEntity::getId).collect(Collectors.toList());
            if (!failed.isEmpty()) {
                log.warn("Failed submit tasks: {}", failed);
            }
        }
    }

    @Override
    public void monitorTask() {
        List<EngineTaskInfoEntity> unFinished = sparkSQLTaskInfoDao.findUnFinishedTasks();
        if (unFinished.isEmpty()) {
            log.trace("No un-finish task!");
            return;
        }
        Map<String, Long> runningTaskChange = new HashMap<>();
        unFinished.forEach(task -> {
            String namespace = ClusterUtil.convertCluster2Namespace(task.getInstance());
            try {
                String redisKey = redisClient.generateRedisKey(task.getInstance());
                // driver pod need exist
                if (kubernetesClient.existPod(namespace, task.getDriverPodId())) {

                    TaskStatusTypeEnum newStatus =
                        getCurrentStatus(
                            kubernetesClient.readNamespacedPod(namespace, task.getDriverPodId()).get().getStatus().getPhase());
                    // status changed
                    if (!task.getStatusTypeEnum().equals(newStatus)) {
                        log.info("Task:{} status changed {} ------> {}", task.getId(), task.getStatusTypeEnum(),
                            newStatus);
                        task.setStatusTypeEnum(newStatus);
                        if (endedTypes().contains(newStatus.name())) {
                            // try to get pod end time
                            Date finishTime =
                                kubernetesClient.readNamespacedPod(namespace, task.getDriverPodId()).get().getStatus()
                                    .getContainerStatuses().get(0).getState().getTerminated().getFinishedAt().toDate();
                            task = updateTaskWhenEnd(task, finishTime);
                            runningTaskChange.put(redisKey, runningTaskChange.getOrDefault(redisKey, 0L) - 1);
                        }
                    }
                } else if (Duration.between(task.getSubmitTime().toInstant(), Instant.now())
                    .toMillis() > lossCheckInterval) {
                    // loss
                    log.warn("Driver pod:{}:{} not exist, changed status {} ------> {}", namespace,
                        task.getDriverPodId(), task.getStatusTypeEnum(), TaskStatusTypeEnum.LOST);
                    task.setStatusTypeEnum(TaskStatusTypeEnum.LOST);
                    runningTaskChange.put(redisKey, runningTaskChange.getOrDefault(redisKey, 0L) - 1);
                }
            } catch (Exception e) {
                // any exception will not block others
                log.error("Monitor catch exception, namespace:{} pod:{} error:{}", namespace,
                    task.getDriverPodId(), e.getMessage());
            }
        });
        redisClient.updateRunningTask(runningTaskChange);
        updateAll(unFinished);
    }

    @Override
    public void recoverTask() {
        List<EngineTaskInfoEntity> failedTasks = sparkSQLTaskInfoDao.findSubmitFailedTasks();
        if (failedTasks.isEmpty()) {
            log.trace("No failed task need to re-submit!");
            return;
        }
        failedTasks.forEach(task -> {
            try {
                // Verify and resubmit after 50 seconds
                Date submitTime = task.getSubmitTime();
                if (Duration.between(submitTime.toInstant(), Instant.now())
                    .toMillis() > submitRetryInterval && task.getRetryTimes() <= submitMaxRetryTimes) {
                    LakehouseResponse res = iLogService.checkStartSuccess(task.getLogPath());
                    if (res.getCode().equals(HttpStatus.OK.getCode())) {
                        // if log generated, change to submitted and monitor will change it to right status;
                        task.setStatusTypeEnum(TaskStatusTypeEnum.SUBMITTED);
                        log.info("Log file found. Task:{} status changed {} ------> {}", task.getId(),
                            TaskStatusTypeEnum.SUBMIT_FAILED, TaskStatusTypeEnum.SUBMITTED);
                    } else {
                        // log not found, change to accept and will re-submit it
                        task.setStatusTypeEnum(TaskStatusTypeEnum.ACCEPTED);
                        log.info(
                            "Log file not found, will re-submit task. Task:{} status changed {} ------> {}",
                            task.getId(), TaskStatusTypeEnum.SUBMIT_FAILED, TaskStatusTypeEnum.ACCEPTED);
                    }
                } else if (task.getRetryTimes() > submitMaxRetryTimes) {
                    // Had retry 3 times, make it failed.
                    task.setStatusTypeEnum(TaskStatusTypeEnum.FAILED);
                    log.warn("Task:{} failed after 3 times submit, status changed {} ------> {}",
                        task.getTaskId(), TaskStatusTypeEnum.SUBMIT_FAILED, TaskStatusTypeEnum.FAILED);
                }
            } catch (Exception e) {
                // any exception will not block others
                log.error("Recover catch exception, task:{} error:{}", task.toString(), e.getMessage());
                // Mark failed directly
                task.setStatusTypeEnum(TaskStatusTypeEnum.FAILED);
            }
        });
        updateAll(failedTasks);
    }

    private String getDefaultFsUri() {
        return "s3a://" + s3Config.getBucket();
    }

    protected KeyValue<Integer, KeyValue<Integer, ComputeResource>> getInstanceNumAndDriverResource(LakehouseClusterInfoEntity clusterInfo) {
        // get parallelize
        int parallelize = clusterInfo.getTaskParallelize();
        int scale = ResourceSizeConfig.resourceScale;

        ComputeResource computeResource =
            ResourceSizeConfig.getResource(clusterInfo.getComputeResourceSize());
        int requestCpu = computeResource.getCpu() / parallelize;
        int requestMem = computeResource.getMemory() / parallelize;
        assert requestCpu > 1 && requestMem > 1 * scale : String
            .format("parallelize(%d) is too large, no enough resource to compute", parallelize);
        // drive min 1c 4g, max 4c 16g
        // driver will request 1/5 resource
        int driverCpu = Math.min(4, Math.max(1, requestCpu / 5));
        int driverMem = driverCpu * scale;

        int remainingCpu = requestCpu - driverCpu;
        int remainingMem = requestMem - driverMem;

        assert remainingCpu > 0 && remainingMem > 0 : "Resource must be positive.";

        // default set cpu 4 for one executor, at least 1
        int defaultExecutorCpu = getDefaultExecutorCpu(remainingCpu);
        int instanceNum = Math.max(1, Math.min(remainingCpu / defaultExecutorCpu,
            remainingMem / (defaultExecutorCpu * ResourceSizeConfig.resourceScale)));
        driverCpu = requestCpu - instanceNum * defaultExecutorCpu;
        driverMem = requestMem - instanceNum * defaultExecutorCpu * ResourceSizeConfig.resourceScale;
        return KeyValue.of(instanceNum, KeyValue.of(defaultExecutorCpu,
            ComputeResource.builder().memory(driverMem).cpu(driverCpu).build()));
    }

    private void generateEngineConfig(Map<String, String> sparkConf, String routerUrl, int instanceNum,
                                      int executorCpus, ComputeResource driverResource, String clusterName) {
        String namespace = ClusterUtil.convertCluster2Namespace(clusterName);
        String router = ClusterUtil.convertCluster2Namespace(routerUrl);

        sparkConf.put("spark.master", KubernetesConfiguration.getK8sDefaultClusterInfo());
        sparkConf.put("spark.submit.deployMode", "cluster");
        // set resource
        sparkConf.put("spark.driver.cores", String.valueOf(driverResource.getCpu()));
        sparkConf.put("spark.driver.memory", String.format("%dG", driverResource.getMemory()));
        sparkConf.put("spark.executor.cores", String.valueOf(executorCpus));
        sparkConf.put("spark.executor.memory",
            String.format("%dG", executorCpus * ResourceSizeConfig.resourceScale));
        sparkConf.put("spark.executor.instances", String.valueOf(instanceNum));

        // TODO support hdfs, set config of hdfs
        sparkConf.put("spark.hadoop.fs.defaultFS", getDefaultFsUri());

        // set hive
        sparkConf.put("spark.hadoop.hive.default.fileformat", "parquet");
        sparkConf.put("spark.hive.metastore.uris", k8sUriHandler.getInternalMetaUrl());
        sparkConf.put("spark.sql.default.database", clusterName);

        // if yunikorn, add config
        if (ResourceManagerEnum.YunikornResourceManager.getValue()
            .equals(resourceManager)) {
            sparkConf.put("spark.kubernetes.executor.scheduler.name", "yunikorn");
            String queueName =
                String.format("%s.%s", defaultQueueRootName,
                    ClusterUtil.convertCluster2Namespace(clusterName));
            sparkConf.put("spark.kubernetes.driver.label.queue", queueName);
            sparkConf.put("spark.kubernetes.executor.label.queue", queueName);
        }

        // add remote submit config
        if (StringUtils.isNotBlank(kubeConfig.getAccessToken())) {
            sparkConf.put("spark.kubernetes.trust.certificates", "true");
            sparkConf.put("spark.kubernetes.authenticate.submission.oauthToken",
                kubeConfig.getAccessToken());
        }

        // k8s
        sparkConf.put("spark.kubernetes.namespace", namespace);
        sparkConf.put("spark.kubernetes.container.image", KubernetesConfiguration.sparkImage);
        // add selector
        if (StringUtils.isNotBlank(kubeConfig.getNodeSelector())) {
            for (Map.Entry<String, String> kv : KubernetesConfiguration.getNodeSelector().entrySet()) {
                sparkConf.put(String.format("spark.kubernetes.node.selector.%s", kv.getKey()),
                    kv.getValue());
            }
        }
    }

    protected void trySubmitTask(EngineTaskInfoEntity task, String routerInfo, int instanceNum, int executorCpus,
                                 ComputeResource driverResource) {
        KubernetesCommonTaskReq req = new KubernetesCommonTaskReq();
        String namespace = ClusterUtil.convertCluster2Namespace(task.getInstance());
        req.setMaster(KubernetesConfiguration.getK8sDefaultClusterInfo());
        req.setDriverMemory(String.format("%dG", driverResource.getMemory()));
        req.setDriverCores(driverResource.getCpu());
        req.setExecutorCores(executorCpus);
        req.setExecutorMemory(String.format("%dG", executorCpus * ResourceSizeConfig.resourceScale));
        req.setInstances(instanceNum);
        req.setMetastoreUris(task.getQueryMetastoreURI());
        req.setNamespace(namespace);
        // use default sa. Use spark sa if use Yunikorn.
        req.setImage(KubernetesConfiguration.sparkImage);
        req.setDriverPodName(task.getDriverPodId());
        req.setDefaultDatabase(task.getDbName());
        req.setProxyUser(task.getSubmitUser());

        if (null != task.getEngineArgs() && task.getEngineArgs().size() > 0) {
            req.setEngineArgs(task.getEngineArgs());
        }

        Map<String, String> sparkConfig = new HashMap<>();
        sparkConfig.put("spark.hadoop.fs.defaultFS", getDefaultFsUri());
        // set hive default format
        sparkConfig.put("spark.hadoop.hive.default.fileformat", "parquet");
        if (StringUtils.isNotEmpty(task.getQueryRedisHost())) {
            sparkConfig.put("spark.redis.enable", "true");
            sparkConfig.put("spark.redis.host", task.getQueryRedisHost());
            sparkConfig.put("spark.redis.port", task.getQueryRedisPort());
            if (StringUtils.isNoneEmpty(redisConfig.getPassword())) {
                sparkConfig.put("spark.redis.auth", redisConfig.getPassword());
            }
        }

        // if has oss table
        // check jar task
        if ("jar".equals(task.getTaskType().toLowerCase(Locale.ROOT))) {
            sparkConfig.put("spark.hadoop.fs.s3a.path.style.access",
                String.valueOf(isAwsPathStyleAccess));
        } else if (StringUtils.isNoneEmpty(task.getSqlContent())) {
            // Map<String, String> ossConfig = checkOss(task.getSqlContent(), task.getDefaultDb());
            Map<String, String> ossConfig = new HashMap<>();
            ossConfig.put("spark.hadoop.fs.s3a.endpoint", s3Config.getEndpoint());
            ossConfig.put("spark.hadoop.fs.s3a.access.key", s3Config.getAccessKey());
            ossConfig.put("spark.hadoop.fs.s3a.secret.key", s3Config.getSecretKey());
            if (isAwsPathStyleAccess) {
                ossConfig.put("spark.hadoop.fs.s3a.path.style.access", String.valueOf(true));
            }
            sparkConfig.putAll(ossConfig);
        }

        // if yunikorn, add config
        if (ResourceManagerEnum.YunikornResourceManager.getValue()
            .equals(resourceManager)) {
            sparkConfig.put("spark.kubernetes.executor.scheduler.name", "yunikorn");
            String queueName = String.format("%s.%s", defaultQueueRootName,
                ClusterUtil.convertCluster2Namespace(task.getInstance()));
            sparkConfig.put("spark.kubernetes.driver.label.queue", queueName);
            sparkConfig.put("spark.kubernetes.executor.label.queue", queueName);
        }

        // add remote submit config
        if (StringUtils.isNotBlank(kubeConfig.getAccessToken())) {
            sparkConfig.put("spark.kubernetes.trust.certificates", "true");
            sparkConfig.put("spark.kubernetes.authenticate.submission.oauthToken",
                kubeConfig.getAccessToken());
        }

        // add selector
        if (StringUtils.isNotBlank(kubeConfig.getNodeSelector())) {
            for (Map.Entry<String, String> kv : KubernetesConfiguration.getNodeSelector().entrySet()) {
                sparkConfig.put(String.format("spark.kubernetes.node.selector.%s", kv.getKey()),
                    kv.getValue());
            }
        }

        sparkConfig.putAll(task.getEngineConfig());
        req.setMainClass(task.getMainClass());
        req.setJarPath(task.getFilePath());

        req.setConfig(sparkConfig);

        log.info("Try to submit task to cluster {} with args: {}", KubernetesConfiguration.getK8sDefaultClusterInfo(), req.toString());
        LakehouseResponse<String> res = engineService.submitSQLTaskOnK8s(req);
        if (!res.getCode().equals(HttpStatus.OK.getCode())) {
            log.error("Failed to submit task, response status {} message: {}", res.getCode(), res.getMessage());
            // Make submit failed
            task.setSubmitTime(new Date());
            task.setStatusTypeEnum(TaskStatusTypeEnum.SUBMIT_FAILED);
            task.setRetryTimes(task.getRetryTimes() + 1);
        } else {
            task.setLogPath(res.getData());
            task.setSubmitTime(new Date());
            task.setStatusTypeEnum(TaskStatusTypeEnum.SUBMITTED);
        }
    }

    public Map<String, String> checkOss(String sqlContent, String clusterName) {
        List<KeyValue<String, String>> tables = SparkSqlTblInfo.getTables(sqlContent, clusterName);

        Map<String, String> sparkConfig = new HashMap<>();

        if (tables.size() > 0) {
            List<HiveMetastoreConfigEntity> configPOS = metastoreConfigDao.findByTableConfigs(tables);
            Optional<HiveMetastoreConfigEntity> po = configPOS.stream()
                .filter(entity -> Constants.S3A_ENDPOINT.equals(entity.getParamKey())).findFirst();
            if (po.isPresent()) {
                // if has s3 table, add it
                configPOS.stream().filter(e -> po.get().getId().getTblId() == e.getId().getTblId())
                    .filter(e -> Arrays.asList(Constants.S3A_ENDPOINT, Constants.S3A_ACCESS_KEY,
                        Constants.S3A_SECRET_KEY).contains(e.getParamKey()))
                    .forEach(e -> sparkConfig.put(String.format("spark.hadoop.%s", e.getParamKey()),
                        e.getParamValue()));
                if (isAwsPathStyleAccess) {
                    sparkConfig.put("spark.hadoop.fs.s3a.path.style.access",
                        String.valueOf(true));
                }
            }
        }
        return sparkConfig;
    }

    protected List<EngineTaskInfoEntity> updateAll(List<EngineTaskInfoEntity> pos) {
        try {
            return sparkSQLTaskInfoDao.saveAll(pos);
        } catch (ObjectOptimisticLockingFailureException e) {
            log.warn(e.getMessage());
            List<EngineTaskInfoEntity> changed = new ArrayList<>();
            pos.stream().forEach(po -> {
                try {
                    EngineTaskInfoEntity c = sparkSQLTaskInfoDao.save(po);
                    changed.add(c);
                } catch (Exception exception) {
                    // nothing
                }
            });
            return changed;
        }
    }

    protected EngineTaskInfoEntity updateTaskWhenEnd(EngineTaskInfoEntity task, Date finishTime) {
        task.setFinishTime(finishTime);
        task.setWaitTime(
            Duration.between(task.getCreateTime().toInstant(), task.getSubmitTime().toInstant())
                .getSeconds());
        task.setRunTime(Duration
            .between(task.getSubmitTime().toInstant(), finishTime.toInstant()).getSeconds());
        return task;
    }

    private List<String> endedTypes() {
        return Arrays.asList(TaskStatusTypeEnum.FAILED.name(), TaskStatusTypeEnum.FINISHED.name());
    }

    private int getDefaultExecutorCpu(int remainingCpu) {
        if (remainingCpu > 20) {
            return 5;
        } else {
            return defaultExecutorCpu;
        }
    }

    private TaskStatusTypeEnum getCurrentStatus(String status) {
        switch (status) {
            case "Pending":
                return TaskStatusTypeEnum.SUBMITTED;
            case "Running":
                return TaskStatusTypeEnum.RUNNING;
            case "Succeeded":
                return TaskStatusTypeEnum.FINISHED;
            case "Failed":
                return TaskStatusTypeEnum.FAILED;
            case "Unknown":
            default:
                return TaskStatusTypeEnum.UNKNOWN;
        }
    }
}
